import base64
import logging
from typing import TYPE_CHECKING, ClassVar, Dict, List, Optional

from ..config import Config
from ..utils import SavedSecret
from .irresource import IRResource

if TYPE_CHECKING:
    from .ir import IR  # pragma: no cover
    from .irtls import IRAmbassadorTLS  # pragma: no cover


class IRTLSContext(IRResource):
    CertKeys: ClassVar = {
        "secret",
        "cert_chain_file",
        "private_key_file",
        "ca_secret",
        "cacert_chain_file",
        "crl_secret",
        "crl_file",
    }

    AllowedKeys: ClassVar = {
        "_ambassador_enabled",
        "_legacy",
        "alpn_protocols",
        "cert_required",
        "cipher_suites",
        "ecdh_curves",
        "hosts",
        "max_tls_version",
        "min_tls_version",
        "redirect_cleartext_from",
        "secret_namespacing",
        "sni",
    }

    AllowedTLSVersions = ["v1.0", "v1.1", "v1.2", "v1.3"]

    name: str
    hosts: Optional[List[str]]
    alpn_protocols: Optional[str]
    cert_required: Optional[bool]
    min_tls_version: Optional[str]
    max_tls_version: Optional[str]
    cipher_suites: Optional[str]
    ecdh_curves: Optional[str]
    redirect_cleartext_from: Optional[int]
    secret_namespacing: Optional[bool]
    secret_info: dict
    sni: Optional[str]

    is_fallback: bool

    _ambassador_enabled: bool
    _legacy: bool

    def __init__(
        self,
        ir: "IR",
        aconf: Config,
        rkey: str,  # REQUIRED
        name: str,  # REQUIRED
        location: str,  # REQUIRED
        namespace: Optional[str] = None,
        metadata_labels: Optional[Dict[str, str]] = None,
        kind: str = "IRTLSContext",
        apiVersion: str = "getambassador.io/v3alpha1",
        is_fallback: Optional[bool] = False,
        **kwargs,
    ) -> None:
        new_args = {
            x: kwargs[x]
            for x in kwargs.keys()
            if x in IRTLSContext.AllowedKeys.union(IRTLSContext.CertKeys)
        }

        super().__init__(
            ir=ir,
            aconf=aconf,
            rkey=rkey,
            location=location,
            kind=kind,
            name=name,
            namespace=namespace,
            metadata_labels=metadata_labels,
            is_fallback=is_fallback,
            apiVersion=apiVersion,
            **new_args,
        )

    def pretty(self) -> str:
        secret_name = self.secret_info.get("secret", "-no secret-")
        hoststr = getattr(self, "hosts", "-any-")
        fbstr = " (fallback)" if self.is_fallback else ""

        rcf = self.get("redirect_cleartext_from", None)
        rcfstr = f" rcf {rcf}" if (rcf is not None) else ""

        return f"<IRTLSContext {self.name}.{self.namespace}{rcfstr}{fbstr}: hosts {hoststr} secret {secret_name}>"

    def setup(self, ir: "IR", aconf: Config) -> bool:
        if not self.get("_ambassador_enabled", False):
            spec_count = 0
            errors = 0

            if self.get("secret", None):
                spec_count += 1

            if self.get("cert_chain_file", None):
                spec_count += 1

                if not self.get("private_key_file", None):
                    err_msg = f"TLSContext {self.name}: 'cert_chain_file' requires 'private_key_file' as well"

                    self.post_error(err_msg)
                    errors += 1

            if spec_count == 2:
                err_msg = f"TLSContext {self.name}: exactly one of 'secret' and 'cert_chain_file' must be present"

                self.post_error(err_msg)
                errors += 1

            if errors:
                return False

        # self.sourced_by(config)
        # self.referenced_by(config)

        # Assume that we have no redirect_cleartext_from...
        rcf = self.get("redirect_cleartext_from", None)

        if rcf is not None:
            try:
                self.redirect_cleartext_from = int(rcf)
            except ValueError:
                err_msg = f"TLSContext {self.name}: redirect_cleartext_from must be a port number rather than '{rcf}'"
                self.post_error(err_msg)
                self.redirect_cleartext_from = None

        # Finally, move cert keys into secret_info.
        self.secret_info = {}

        for key in IRTLSContext.CertKeys:
            if key in self:
                self.secret_info[key] = self.pop(key)

        ir.logger.debug(f"IRTLSContext setup good: {self.pretty()}")

        return True

    def resolve_secret(self, secret_name: str) -> SavedSecret:
        # Assume that we need to look in whichever namespace the TLSContext itself is in...
        namespace = self.namespace

        # You can't just always allow '.' in a secret name to span namespaces, or you end up with
        # https://github.com/datawire/ambassador/issues/1255, which is particularly problematic
        # because (https://github.com/datawire/ambassador/issues/1475) Istio likes to use '.' in
        # mTLS secret names. So we default to allowing the '.' as a namespace separator, but
        # you can set secret_namespacing to False in a TLSContext or tls_secret_namespacing False
        # in the Ambassador module's defaults to prevent that.

        secret_namespacing = self.lookup(
            "secret_namespacing", True, default_key="tls_secret_namespacing"
        )

        self.ir.logger.debug(
            f"TLSContext.resolve_secret {secret_name}, namespace {namespace}: namespacing is {secret_namespacing}"
        )

        if "." in secret_name and secret_namespacing:
            secret_name, namespace = secret_name.rsplit(".", 1)

        return self.ir.resolve_secret(self, secret_name, namespace)

    def resolve(self) -> bool:
        if self.get("_ambassador_enabled", False):
            self.ir.logger.debug("IRTLSContext skipping resolution of null context")
            return True

        # is_valid determines if the TLS context is valid
        is_valid = False

        # If redirect_cleartext_from or alpn_protocols is specified, the TLS Context is
        # valid anyway, even if secret config is invalid
        if self.get("redirect_cleartext_from", False) or self.get("alpn_protocols", False):
            is_valid = True

        # If we don't have secret info, it's worth posting an error.
        if not self.secret_info:
            self.post_error(
                "TLSContext %s has no certificate information at all?" % self.name,
                log_level=logging.DEBUG,
            )

        self.ir.logger.debug("resolve_secrets working on: %s" % self.as_json())

        # OK. Do we have a secret name?
        secret_name = self.secret_info.get("secret")
        secret_valid = True

        if secret_name:
            # Yes. Try loading it. This always returns a SavedSecret, so that our caller
            # has access to the name and namespace. The SavedSecret will evaluate non-True
            # if we found no cert though.
            ss = self.resolve_secret(secret_name)

            self.ir.logger.debug("resolve_secrets: IR returned secret %s as %s" % (secret_name, ss))

            if not ss:
                # This is definitively an error: they mentioned a secret, it can't be loaded,
                # post an error.
                self.post_error(
                    "TLSContext %s found no certificate in %s, ignoring..." % (self.name, ss.name)
                )
                self.secret_info.pop("secret")
                secret_valid = False
            else:
                # If they only gave a public key, that's an error
                if not ss.key_path:
                    self.post_error(
                        "TLSContext %s found no private key in %s" % (self.name, ss.name)
                    )
                    return False

                # So far, so good.
                self.ir.logger.debug("TLSContext %s saved secret %s" % (self.name, ss.name))

                # Update paths for this cert.
                self.secret_info["cert_chain_file"] = ss.cert_path
                self.secret_info["private_key_file"] = ss.key_path

                if ss.root_cert_path:
                    self.secret_info["cacert_chain_file"] = ss.root_cert_path

        self.ir.logger.debug(
            "TLSContext - successfully processed the cert_chain_file, private_key_file, and cacert_chain_file: %s"
            % self.secret_info
        )

        # OK. Repeat for the crl_secret.
        crl_secret = self.secret_info.get("crl_secret")
        if crl_secret:
            # They gave a secret name for the certificate revocation list. Try loading it.
            crls = self.resolve_secret(crl_secret)

            self.ir.logger.debug(
                "resolve_secrets: IR returned secret %s as %s" % (crl_secret, crls)
            )

            if not crls:
                # This is definitively an error: they mentioned a secret, it can't be loaded,
                # give up.
                self.post_error(
                    "TLSContext %s found no certificate revocation list in %s"
                    % (self.name, crls.name)
                )
                secret_valid = False
            else:
                self.ir.logger.debug(
                    "TLSContext %s saved certificate revocation list secret %s"
                    % (self.name, crls.name)
                )
                self.secret_info["crl_file"] = crls.user_path

        # OK. Repeat for the ca_secret_name.
        ca_secret_name = self.secret_info.get("ca_secret")

        if ca_secret_name:
            if not self.secret_info.get("cert_chain_file"):
                # DUPLICATED BELOW: This is an error: validation without termination isn't meaningful.
                # (This is duplicated for the case where they gave a validation path.)
                self.post_error(
                    "TLSContext %s cannot validate client certs without TLS termination" % self.name
                )
                return False

            # They gave a secret name for the validation cert. Try loading it.
            ss = self.resolve_secret(ca_secret_name)

            self.ir.logger.debug(
                "resolve_secrets: IR returned secret %s as %s" % (ca_secret_name, ss)
            )

            if not ss:
                # This is definitively an error: they mentioned a secret, it can't be loaded,
                # give up.
                self.post_error(
                    "TLSContext %s found no validation certificate in %s" % (self.name, ss.name)
                )
                secret_valid = False
            else:
                # Validation certs don't need the private key, but it's not an error if they gave
                # one. We're good to go here.
                self.ir.logger.debug("TLSContext %s saved CA secret %s" % (self.name, ss.name))
                self.secret_info["cacert_chain_file"] = ss.cert_path

                # While we're here, did they set cert_required _in the secret_?
                if ss.cert_data:
                    cert_required = ss.cert_data.get("cert_required")

                    if cert_required is not None:
                        decoded = base64.b64decode(cert_required).decode("utf-8").lower() == "true"

                        # cert_required is at toplevel, _not_ in secret_info!
                        self["cert_required"] = decoded
        else:
            # No secret is named; did they provide a file location instead?
            if self.secret_info.get("cacert_chain_file") and not self.secret_info.get(
                "cert_chain_file"
            ):
                # DUPLICATED ABOVE: This is an error: validation without termination isn't meaningful.
                # (This is duplicated for the case where they gave a validation secret.)
                self.post_error(
                    "TLSContext %s cannot validate client certs without TLS termination" % self.name
                )
                return False

        # If the secret has been invalidated above, then we do not need to check for paths down under.
        # We can return whether the TLS Context is valid or not.
        if not secret_valid:
            return is_valid

        # OK. Check paths.
        errors = 0

        # self.ir.logger.debug("resolve_secrets before path checks: %s" % self.as_json())
        for key in ["cert_chain_file", "private_key_file", "cacert_chain_file", "crl_file"]:
            path = self.secret_info.get(key, None)

            if path:
                fc = getattr(self.ir, "file_checker")
                if not fc(path):
                    self.post_error("TLSContext %s found no %s '%s'" % (self.name, key, path))
                    errors += 1
            elif (not (key == "cacert_chain_file" or key == "crl_file")) and self.get(
                "hosts", None
            ):
                self.post_error("TLSContext %s is missing %s" % (self.name, key))
                errors += 1

        if errors > 0:
            return False

        return True

    def has_secret(self) -> bool:
        # Safely verify that self.secret_info['secret'] exists -- in other words, verify
        # that this IRTLSContext is based on a Secret we load from elsewhere, rather than
        # on files in the filesystem.
        si = self.get("secret_info", {})

        return "secret" in si

    def secret_name(self) -> Optional[str]:
        # Return the name of the Secret we're based on, or None if we're based on files
        # in the filesystem.
        #
        # XXX Currently this implies a _Kubernetes_ Secret, and we might have to change
        # this later.

        if self.has_secret():
            return self.secret_info["secret"]
        else:
            return None

    def set_secret_name(self, secret_name: str) -> None:
        # Set the name of the Secret we're based on.
        self.secret_info["secret"] = secret_name

    @classmethod
    def null_context(cls, ir: "IR") -> "IRTLSContext":
        ctx = ir.get_tls_context("no-cert-upstream")

        if not ctx:
            ctx = IRTLSContext(
                ir,
                ir.aconf,
                rkey="ir.no-cert-upstream",
                name="no-cert-upstream",
                location="ir.no-cert-upstream",
                kind="null-TLS-context",
                _ambassador_enabled=True,
            )

            ir.save_tls_context(ctx)

        return ctx

    @classmethod
    def from_legacy(
        cls,
        ir: "IR",
        name: str,
        rkey: str,
        location: str,
        cert: "IRAmbassadorTLS",
        termination: bool,
        validation_ca: Optional["IRAmbassadorTLS"],
    ) -> "IRTLSContext":
        """
        Create an IRTLSContext from a legacy TLS-module style definition.

        'cert' is the TLS certificate that we'll offer to our peer -- for a termination
        context, this is our server cert, and for an origination context, it's our client
        cert.

        For termination contexts, 'validation_ca' may also be provided. It's the TLS
        certificate that we'll use to validate the certificates our clients offer. Note
        that no private key is needed or supported.

        :param ir: IR in play
        :param name: name for the newly-created context
        :param rkey: rkey for the newly-created context
        :param location: location for the newly-created context
        :param cert: information about the cert to present to the peer
        :param termination: is this a termination context?
        :param validation_ca: information about how we'll validate the peer's cert
        :return: newly-created IRTLSContext
        """
        new_args = {}

        for key in [
            "secret",
            "cert_chain_file",
            "private_key_file",
            "alpn_protocols",
            "redirect_cleartext_from",
        ]:
            value = cert.get(key, None)

            if value:
                new_args[key] = value

        if (
            ("secret" not in new_args)
            and ("cert_chain_file" not in new_args)
            and ("private_key_file" not in new_args)
        ):
            # Assume they want the 'ambassador-certs' secret.
            new_args["secret"] = "ambassador-certs"

        if termination:
            new_args["hosts"] = ["*"]

            if validation_ca and validation_ca.get("enabled", True):
                for key in ["secret", "cacert_chain_file", "cert_required"]:
                    value = validation_ca.get(key, None)

                    if value:
                        if key == "secret":
                            new_args["ca_secret"] = value
                        else:
                            new_args[key] = value

                if ("ca_secret" not in new_args) and ("cacert_chain_file" not in new_args):
                    # Assume they want the 'ambassador-cacert' secret.
                    new_args["secret"] = "ambassador-cacert"

        ctx = IRTLSContext(
            ir,
            ir.aconf,
            rkey=rkey,
            name=name,
            location=location,
            kind="synthesized-TLS-context",
            _legacy=True,
            **new_args,
        )

        return ctx


class TLSContextFactory:
    @classmethod
    def load_all(cls, ir: "IR", aconf: Config) -> None:
        assert ir

        # Save TLS contexts from the aconf into the IR. Note that the contexts in the aconf
        # are just ACResources; they need to be turned into IRTLSContexts.
        tls_contexts = aconf.get_config("tls_contexts")

        if tls_contexts is not None:
            for config in tls_contexts.values():
                ctx = IRTLSContext(ir, aconf, **config)

                if ctx.is_active():
                    ctx.referenced_by(config)
                    ctx.sourced_by(config)

                    ir.save_tls_context(ctx)
